/**
 *    Copyright (C) 2025 EloqData Inc.
 *
 *    This program is free software: you can redistribute it and/or  modify
 *    it under either of the following two licenses:
 *    1. GNU Affero General Public License, version 3, as published by the Free
 *    Software Foundation.
 *    2. GNU General Public License as published by the Free Software
 *    Foundation; version 2 of the License.
 *
 *    This program is distributed in the hope that it will be useful,
 *    but WITHOUT ANY WARRANTY; without even the implied warranty of
 *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *    GNU Affero General Public License or GNU General Public License for more
 *    details.
 *
 *    You should have received a copy of the GNU Affero General Public License
 *    and GNU General Public License V2 along with this program.  If not, see
 *    <http://www.gnu.org/licenses/>.
 *
 */
#pragma once

#include <brpc/redis.h>
#include <brpc/socket.h>

#include <algorithm>
#include <chrono>
#include <list>
#include <memory>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>

#include "absl/container/flat_hash_set.h"
#include "redis_handler.h"
#include "redis_object.h"
#include "redis_stats.h"

namespace EloqKV
{
class PubSubManager;

struct BucketScanCursor
{
    BucketScanCursor() = default;

    uint64_t cursor_id_{0};
    RedisObjectType obj_type_{RedisObjectType::Unknown};
    std::string cmd_pattern_{};

    size_t cache_idx_{0};
    std::vector<std::string> cache_;
    txservice::BucketScanSavePoint save_point_;
};

class RedisConnectionContext : public brpc::ConnectionContext
{
public:
    RedisConnectionContext() : output(&arena) {};
    explicit RedisConnectionContext(brpc::Socket *sock, PubSubManager *mgr)
        : socket(sock),
          connect_time_us(
              std::min(sock->last_active_time_us(),
                       std::chrono::duration_cast<std::chrono::microseconds>(
                           std::chrono::system_clock::now().time_since_epoch())
                           .count())),
          output(&arena),
          pub_sub_mgr(mgr)
    {
        RedisStats::IncrConnReceived();
    }

    ~RedisConnectionContext() override;

    void SubscribeChannel(std::string_view chan);

    void UnsubscribeChannel(std::string_view chan);

    void SubscribePattern(std::string_view pattern);

    void UnsubscribePattern(std::string_view pattern);

    int SubscriptionsCount() const;

    brpc::RedisReply *GetOutput();

    bool FlushOutput();

    // Cache cursor content and return a hash key to fetch next.
    uint64_t CacheScanCursor(const std::string_view cursor_content);

    // Find cursor content with cursor id(generated by "CacheScanCursor()")
    std::pair<bool, const std::string *> FindScanCursor(
        uint64_t cursor_id) const;

    brpc::Socket *socket{};

    // If user starts a transaction with multi command,
    // multi_transaction_handler indicates the handler pointer that runs the
    // transaction command.
    std::unique_ptr<MultiTransactionHandler> multi_transaction_handler{};
    // Whether this connection has begun a transaction with multi command. If
    // true, the commands received will be handled by transaction_handler.
    bool in_multi_transaction{};

    // If use starts a transaction with BEGIN command, txm for that tx will be
    // set here.
    txservice::TransactionExecution *txm{nullptr};

    bool authenticated{false};

    int db_id{0};

    int64_t connect_time_us{0};

    // CLIENT SETNAME, CLIENT GETNAME.
    std::string connection_name;

    std::string lib_name;
    std::string lib_ver;

    // Keep track of the recently scan cursor. map<cursor_hash, pair<cursor,
    // count>>
    std::unordered_map<uint64_t, std::pair<std::string, int>> scan_cursors{};
    // The queue of returned cursors, pair<cursor_hash, pair<cursor, count>*> .
    std::list<std::pair<const uint64_t, std::pair<std::string, int>> *>
        scan_cursor_list{};

    uint64_t CreateBucketScanCursor(
        std::string_view cursor_content,
        std::unique_ptr<BucketScanCursor> save_point);

    void RemoveBucketScanCursor();
    uint64_t UpdateBucketScanCursor(std::string_view cursor_content);

    BucketScanCursor *FindBucketScanCursor(uint64_t cursor_id);
    // <db_id, <cursor_id, cursor>
    std::unordered_map<int, std::unique_ptr<BucketScanCursor>>
        bucket_scan_cursors;

    butil::Arena arena;
    brpc::RedisReply output;

    PubSubManager *pub_sub_mgr{};

    absl::flat_hash_set<std::string> subscribed_channels{};
    absl::flat_hash_set<std::string> subscribed_patterns{};

    friend class RedisServiceImpl;
    friend class PubSubManager;
};

}  // namespace EloqKV
